<?php
/**
 * Created by PhpStorm.
 * User: evenvi
 * Date: 16-8-10
 * Time: 下午3:36
 */

namespace Evenvi\Mqtt;

include "MqttClientInterface.php";
include "config.php";
include "Common.php";

class MqttClient implements MqttClientInterface
{

    private $tcpClient;

    private $msgid = 1;			/* counter for message id */
    public $keepalive = 20;		/* default keepalive timmer  心跳时间*/
    public $timesinceping;		/* host unix time, used to detect disconects */
    public $topics = array(); 	/* used to store currently subscribed topics */
    public $debug = false;		/* should output debug messages */
    public $address;			/* broker address */
    public $port;				/* broker port */
    public $clientid;			/* client id sent to brocker */
    public $will;				/* stores the will of the client */
    private $username;			/* stores username */
    private $password;			/* stores password */

    private $setSub = false;
    private $setPub = false;

    private $pubPara = array();
    private $subPara = array();

    function __construct($host = '127.0.0.1', $port = 1883)
    {
        $this->address = $address = gethostbyname($host);;
        $this->port = $port;
        $this->tcpClient = new \swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);


        $this->__init();
    }

    private function __init()
    {
        $this->tcpClient->on("connect", function($cli) {
//            $cli->send("hello world\n");
            $this->connect($cli, 'connect_id123');
        });

        $this->tcpClient->on("receive", function($cli, $data = ""){
            $messageType = ord($data{0}) >> 4;
            echo "MessageType: ". $messageType . "\n";
            switch ($messageType)
            {
                case (MQTT_CONNACK):
                    $this->dealConnACK($data);
                    break;
                case (MQTT_PINGRESP):
                    $this->dealPingResp($data);
                    break;
                case (MQTT_PUBACK):
                    $this->dealPubACK($data);
                    break;
                case (MQTT_SUBACK):
                    $this->dealSubACK($data);
                    break;
                case (MQTT_PUBLISH):
                    $this->dealPublish($data);
                    break;
                case (MQTT_PUBREC):
                    $this->dealPubREC($data);
                    break;
                default:
                    echo "Unknown Message Type\n";
                    break;
            }
//            var_dump(ord($data{1}));
//            var_dump(ord($data{2}));
//            var_dump(ord($data{3}));
            echo "\n";
//            $cmd = (int)(ord($data)/16);
//            var_dump($cmd);


            $string = $data;

            if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){
                if($this->debug) echo "Connected to Broker\n";
            }else{
//                error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n", ord($string{0}),ord($string{3})));
                return false;
            }

            $this->timesinceping = time();

        });

        $this->tcpClient->on("close", function($cli){
            $cli->close(); // 1.6.10+ 不需要
            echo "close\n";
        });

        $this->tcpClient->on("error", function($cli){
            exit("error\n");
        });

        if (!$this->tcpClient->connect($this->address, $this->port)) {
            exit("connect failed. Error: { " . $this->tcpClient->errCode . "}\n");
        }
        return;
    }

    public function connect($cli, $clientid, $clean = true, $will = NULL, $username = NULL, $password = NULL)
    {
        $this->clientid = $clientid;

        if ($will) $this->will = $will;
        if ($username) $this->username = $username;
        if ($password) $this->password = $password;


        $i = 0;
        $buffer = "";

        $buffer .= chr(0x00);
        $i++;
        $buffer .= chr(0x06);
        $i++;
        $buffer .= chr(0x4d);
        $i++;
        $buffer .= chr(0x51);
        $i++;
        $buffer .= chr(0x49);
        $i++;
        $buffer .= chr(0x73);
        $i++;
        $buffer .= chr(0x64);
        $i++;
        $buffer .= chr(0x70);
        $i++;
        $buffer .= chr(0x03);
        $i++;

        //No Will
        $var = 0;
        if ($clean) $var += 2;

        //Add will info to header
        if ($this->will != NULL) {
            $var += 4; // Set will flag
            $var += ($this->will['qos'] << 3); //Set will qos
            if ($this->will['retain']) $var += 32; //Set will retain
        }

        if ($this->username != NULL) $var += 128;    //Add username to header
        if ($this->password != NULL) $var += 64;    //Add password to header

        $buffer .= chr($var);
        $i++;

        //Keep alive
        $buffer .= chr($this->keepalive >> 8);
        $i++;
        $buffer .= chr($this->keepalive & 0xff);
        $i++;

        $buffer .= $this->strwritestring($this->clientid, $i);

        //Adding will to payload
        if ($this->will != NULL) {
            $buffer .= $this->strwritestring($this->will['topic'], $i);
            $buffer .= $this->strwritestring($this->will['content'], $i);
        }

        if ($this->username) $buffer .= $this->strwritestring($this->username, $i);
        if ($this->password) $buffer .= $this->strwritestring($this->password, $i);

        $head = "  ";
        $head{0} = chr(0x10);
        $head{1} = chr($i);

        $this->tcpClient->send($head);
        $this->tcpClient->send($buffer);


        return true;

    }


    public function pub($topic, $content, $qos = 0, $retain = 0)
    {
        // TODO: Implement pub() method.
        $this->pubPara = array(
            'topic'=>$topic,
            'content'=>$content,
            'qos'=>$qos,
            'retain'=>$retain
            );

        $this->setPub = true;
        $i = 0;
        $buffer = "";

        $buffer .= $this->strwritestring($topic, $i);

        //$buffer .= $this->strwritestring($content,$i);

        if ($qos) {
            $id = $this->msgid++;
            $buffer .= chr($id >> 8);
            $i++;
            $buffer .= chr($id % 256);
            $i++;
        }

        $buffer .= $content;
        $i += strlen($content);

        $head = " ";
        $cmd = 0x30;
        if ($qos) $cmd += $qos << 1;
        if ($retain) $cmd += 1;

        $head{0} = chr($cmd);
        $head .= $this->setmsglength($i);

        if($this->tcpClient->isConnected())
        {
            $this->tcpClient->send($head);
            $this->tcpClient->send($buffer);

            $this->setSub = false;

        }
//        fwrite($this->socket, $head, strlen($head));
//        fwrite($this->socket, $buffer, $i);


    }

    public function sub($topics, $qos = 0)
    {
        $this->setSub = true;
        $this->subPara = array(
            'topics' => $topics,
            'qos'   => 0
        );

        // TODO: Implement sub() method.
        $i = 0;
        $buffer = "";
        $id = $this->msgid;
        $buffer .= chr($id >> 8);  $i++;
        $buffer .= chr($id % 256);  $i++;

        foreach($topics as $key => $topic){
            $buffer .= $this->strwritestring($key,$i);
            $buffer .= chr($topic["qos"]);  $i++;
            $this->topics[$key] = $topic;
        }

        $cmd = 0x80;
        //$qos
        $cmd +=	($qos << 1);

        $head = chr($cmd);
        $head .= chr($i);

        if($this->tcpClient->isConnected())
        {
            $this->tcpClient->send($head);
            $this->tcpClient->send($buffer);

            $this->setSub = false;

        }

    }

    private function dealConnACK($data)
    {
        $retCode = ord($data{3});
        if($retCode == 0)
        {
            echo "Connect to Broker Success\n";

            //
            if($this->setSub) {
                $this->sub(
                    $this->subPara['topics'],
                    $this->subPara['qos']);
            }
            if($this->setPub) {
                $this->pub(
                    $this->pubPara['topic'],
                    $this->pubPara['content'],
                    $this->pubPara['qos'],
                    $this->pubPara['retain']
                );
            }


            echo "Swoole Client Sock:" . $this->tcpClient->sock . "\n";

            $table = new \swoole_table(5);
            $table->column('id', \swoole_table::TYPE_INT, 32);
            $table->create();

            $table->set('sock_id', array('id'=>$this->tcpClient->sock));

            $process = new \swoole_process(function ($process) use ($table) {

                echo "====== in Sub Process ======\n";
                echo "table before:" . $table->count() . "\n";

                $sock_ids = $table->get('sock_id');
                if (count($sock_ids) == 0) {
                    echo "No Scokc\n";
                    $process->exit(0);
                }
                $sock_id = $sock_ids['id'];

                echo "SockID: " . $sock_id . "\n";
                $sock = fopen("php://fd/".$sock_id, 'w');
                while (1) {
                    sleep($this->keepalive -1);

                    $buffer = "";
                    $buffer .= chr(0xc0);
                    $buffer .= chr(0x00);

                    fwrite($sock,  $buffer);
//                    var_dump($process);
//                    echo $process->pid, "\t", $process->callback . PHP_EOL;
                    echo "Sending Ping \n";
                }

            });
            $pid = $process->start();

            echo "Create Ping Process Success\n";

        }
        else
        {
            echo "Connecto to Broker Failed. Error Code: ". $retCode;
        }
    }


    private function dealPingResp($data)
    {
        echo "Ping Success\n";
    }

    private function dealPubACK($data)
    {
        echo "Pub ACK\n";
    }

    private function dealSubACK($data)
    {
        echo "Suc ACK\n";
    }

    private function dealPublish($data)
    {
        $msg =  substr($data, 13, -1);
        echo "-----------------------\n";
        echo "Receive Publish Message: $msg\n";
        echo "-----------------------\n";
    }

    private function dealPubREC($data)
    {
        echo "REC: ". ord($data{2}) . "\n";
        echo "REC: ". ord($data{3}) . "\n";
        echo "REC: ". substr($data, 3, -1) . "\n";

    }


    function message($msg)
    {
        $tlen = (ord($msg{0}) << 8) + ord($msg{1});
        $topic = substr($msg, 2, $tlen);
        $msg = substr($msg, ($tlen + 2));
        $found = 0;
        foreach ($this->topics as $key => $top) {
            if (preg_match("/^" . str_replace("#", ".*",
                    str_replace("+", "[^\/]*",
                        str_replace("/", "\/",
                            str_replace("$", '\$',
                                $key)))) . "$/", $topic)) {
                if (is_callable($top['function'])) {
                    call_user_func($top['function'], $topic, $msg);
                    $found = 1;
                }
            }
        }

        if ($this->debug && !$found) echo "msg recieved but no match in subscriptions\n";
    }

    /* getmsglength: */
    function getmsglength(&$msg, &$i)
    {

        $multiplier = 1;
        $value = 0;
        do {
            $digit = ord($msg{$i});
            $value += ($digit & 127) * $multiplier;
            $multiplier *= 128;
            $i++;
        } while (($digit & 128) != 0);

        return $value;
    }


    /* setmsglength: */
    function setmsglength($len)
    {
        $string = "";
        do {
            $digit = $len % 128;
            $len = $len >> 7;
            // if there are more digits to encode, set the top bit of this digit
            if ($len > 0)
                $digit = ($digit | 0x80);
            $string .= chr($digit);
        } while ($len > 0);
        return $string;
    }

    /* strwritestring: writes a string to a buffer */
    function strwritestring($str, &$i)
    {
        $ret = " ";
        $len = strlen($str);
        $msb = $len >> 8;
        $lsb = $len % 256;
        $ret = chr($msb);
        $ret .= chr($lsb);
        $ret .= $str;
        $i += ($len + 2);
        return $ret;
    }

    function printstr($string)
    {
        $strlen = strlen($string);
        for ($j = 0; $j < $strlen; $j++) {
            $num = ord($string{$j});
            if ($num > 31)
                $chr = $string{$j};
            else $chr = " ";
            printf("%4d: %08b : 0x%02x : %s \n", $j, $num, $num, $chr);
        }
    }

}
